Skip to main content

gRPC 学习之-中级知识

· 10 min read
Softwore Developer

gRPC 学习通信模型

中级知识

借助 gRPC 可以实现不同的进程间通信模式(也称RPC风格);本章将讨论 gRPC 应用的四种通信模式,一元RPC、服务端流RPC、客户端流RPC、以及双向RPC。

一元RPC

一元RPC:一元RPC模式也被称为简单RPC模式,当客户端调用服务端的远程方法时,客户端发送请求到服务端并获得一个响应,与响应一起发送还有元数据。

服务器端流 RPPC 模式

一元RPC模式:gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。 服务器端流 RPC 模式:gRPC 服务器端在接收到客户端的请求消息后,会发回一个响应序列(简单说就是会发回多个响应)。这种多个响应所组成的序列也被称为流。

例如:

在 OrderManagement 服务中,假设需要实现一个订单搜索功能;利用服务器端流 gRPC 模式时 OrderManagement 服务不会将所有匹配的订单一次性的发送给客户端,而是在找到匹配的订单时,逐步将其发送出去。

grpc-server-stream

  • grpc server
func (s *server) SearchOrders(searchQuery *wrappers.StringValue,stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
log.Print(key, order)
for _, itemStr := range order.Items {
log.Print(itemStr)
if strings.Contains(itemStr, searchQuery.Value) {
// Send the matching orders in a stream
err := stream.Send(&order)
if err != nil {
return fmt.Errorf("error sending message to stream : %v", err)
}
log.Print("Matching Order Found : " + key)
break
}
}
}
return nil
}
  • grpc clinet
func main() {
....
searchStream, _ := client.SearchOrders(ctx, &wrappers.StringValue{Value: "Google"})
for {
searchOrder, err := searchStream.Recv()
if err == io.EOF {
log.Print("EOF")
break
}

if err == nil {
log.Print("Search Result : ", searchOrder)
}
}
...
}

上述就是 gRPC 服务器端流模式,服务在检索到订单时,通过以流的形式发送给客户端 err := stream.Send(&order) ; 客户端在调用时,返回一个客户端流,它有一个 Recv 方法,调用客户端的 Recv 方法,可以逐个读取服务端写入的数据。当发现流结束时, Recv 会返回 io.EOF

客户端流 RPC 模式

客户端流RPC模式:客户端会发起多个请求给服务端,而不再是单个请求,服务器端则会发送一个响应给客户端。但是,服务端不一定要等到从客户端接受到所有消息后才发送响应。

例如:

在 OrderManagement 服务中希望添加 updateOrders 方法,从而更新一个订单集合。

grpc client stream

  • grpc server : 如下方法定义了一个服务端接收客户端流的方法
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
order, err := stream.Recv()
if err == io.EOF {
// 客户端发送完流数据
return stream.SendAndClose(&wrappers.StringValue{Value: "Orders processed " + ordersStr})
}

if err != nil {
return err
}
// Update order
orderMap[order.Id] = *order

log.Printf("Order ID : %s - %s", order.Id, "Updated")
ordersStr += order.Id + ", "
}
}
  • grpc client
func main(){
....省略
// =========================================
// Update Orders : Client streaming scenario
updOrder1 := pb.Order{Id: "102", Items:[]string{"Google Pixel 3A", "Google Pixel Book"}, Destination:"Mountain View, CA", Price:1100.00}
updOrder2 := pb.Order{Id: "103", Items:[]string{"Apple Watch S4", "Mac Book Pro", "iPad Pro"}, Destination:"San Jose, CA", Price:2800.00}
updOrder3 := pb.Order{Id: "104", Items:[]string{"Google Home Mini", "Google Nest Hub", "iPad Mini"}, Destination:"Mountain View, CA", Price:2200.00}

updateStream, err := client.UpdateOrders(ctx)

if err != nil {
log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
}

// Updating order 1
if err := updateStream.Send(&updOrder1); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
}

// Updating order 2
if err := updateStream.Send(&updOrder2); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
}

// Updating order 3
if err := updateStream.Send(&updOrder3); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
}

updateRes, err := updateStream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
}
log.Printf("Update Orders Res : %s", updateRes)

}

客户端以流的形式发送updateStream.Send(&updOrder2)给服务端,一旦所有消息都以流的形式发送出去,客户端就可以将流标记为已完成,并通过 CloseAndRecv 方法来读取服务端的响应。

双向流 RPC 模式

双向流RPC模式:客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。

例如:

在 OrderManagement 服务中,假设需要一个订单处理功能,通过该功能,用户可以发送连续的订单集合,并根据投递地址对它们进行组合发货。

grpc-double-stream

这个业务用例的关键步骤如下所示:

  • 每个订单以独立的 gRPC 消息的形式发送至服务器端。
  • 每个发货组合可能会包含多个订单,它们应该被投递到相同的目的地。
  • 订单是成批处理的,当达到指定的批次大小时,当前创建的所有发货组合都会被发送至客户端。
  • 假设:流中有4个订单,其中有两个订单要发送至X,两个要发送至Y,则可以将其表示为:X、Y、X、Y。如果批次大小为3,那么所创建的订单发货组合会是[X,X],[Y],[Y]。

gRpc Server:主要逻辑就是通过客户端发送过来的订单ID按照指定批次大小以及发货地址进行分组,再把分组后的信息返回给客户端。

const orderBatchSize = 3
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
batchMarker := 1
var combinedShipmentMap = make(map[string]pb.CombinedShipment)
for {
orderId, err := stream.Recv()
log.Printf("Reading Proc order : %s", orderId)
if err == io.EOF {
// 客户端发送关闭时,需要把服务端已经接收到的进行发送
log.Printf("EOF : %s", orderId)
for _, shipment := range combinedShipmentMap {
if err := stream.Send(&shipment); err != nil {
return err
}
}
return nil
}
if err != nil {
log.Println(err)
return err
}

destination := orderMap[orderId.GetValue()].Destination
shipment, found := combinedShipmentMap[destination]

if found {
ord := orderMap[orderId.GetValue()]
shipment.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = shipment
} else {
comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!", }
ord := orderMap[orderId.GetValue()]
comShip.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = comShip
log.Print(len(comShip.OrdersList), comShip.GetId())
}

if batchMarker == orderBatchSize {
for _, comb := range combinedShipmentMap {
log.Printf("Shipping : %v -> %v" , comb.Id, len(comb.OrdersList))
if err := stream.Send(&comb); err != nil {
return err
}
}
batchMarker = 0
combinedShipmentMap = make(map[string]pb.CombinedShipment)
} else {
batchMarker++
}
}
}

gRpc Client:主要逻辑就是发送订单ID给服务端,之后开启 Go协程来读取服务端返回的消息。

func main(){
...
// =========================================
// Process Order : Bi-di streaming scenario
streamProcOrder, err := client.ProcessOrders(ctx)
if err != nil {
log.Fatalf("%v.ProcessOrders(_) = _, %v", client, err)
}

if err := streamProcOrder.Send(&wrappers.StringValue{Value:"102"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "102", err)
}

if err := streamProcOrder.Send(&wrappers.StringValue{Value:"103"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "103", err)
}

if err := streamProcOrder.Send(&wrappers.StringValue{Value:"104"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "104", err)
}

if err := streamProcOrder.Send(&wrappers.StringValue{Value:"101"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "101", err)
}

channel := make(chan struct{})
go asncClientBidirectionalRPC(streamProcOrder, channel)

if err := streamProcOrder.CloseSend(); err != nil {
log.Fatal(err)
}
channel <- struct{}{}

}

func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
for {
combinedShipment, errProcOrder := streamProcOrder.Recv()
if errProcOrder == io.EOF { // 判断服务端流是否结束;服务端方法只要有return就意味着流结束流了,可能是因为正常结束,也可能是错误导致结束,总之只要return之后双向流就断开了。
break
}
log.Printf("Combined shipment : ", combinedShipment.OrdersList)
}
<-c
}

在双向流中可以通过定义客户端流和服务端流的方式来实现,客户端和服务端都定义了 SendRecv 方法。可以实现双向传输。

// 客户端流
type OrderManagement_ProcessOrdersClient interface {
Send(*wrappers.StringValue) error
Recv() (*CombinedShipment, error)
grpc.ClientStream
}

// 服务端流
type OrderManagement_ProcessOrdersServer interface {
Send(*CombinedShipment) error
Recv() (*wrappers.StringValue, error)
grpc.ServerStream
}

客户端可以并发读取和写入同一个流,输入流和输出流可以独立进行操作。